package cn.doitedu.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public class ExtractTimeStampInterceptor implements Interceptor {
    String key_name;
    public ExtractTimeStampInterceptor(String key_name){
        this.key_name = key_name;
    }

    ObjectMapper objectMapper;
    /**
     * 拦截器被实例化后，立即执行一次的方法
     * 可以用于一些初始化操作
     */
    @Override
    public void initialize() {
        objectMapper = new ObjectMapper();
    }

    /**
     * 拦截逻辑所在的方法 : 针对一条数据
     * @param event
     * @return
     */
    @Override
    public Event intercept(Event event) {

        // 从event中拿到body内容（字节数组）
        byte[] bodyBytes = event.getBody();

        // 将字节数组反序列化成String （日志是json）
        String logJson = new String(bodyBytes);

        try {
            // 从数据中抽取时间戳
            JsonNode rootNode = objectMapper.readTree(logJson);
            long timeStamp = rootNode.get("timeStamp").getValueAsLong();

            // 放到event的header中
            Map<String, String> headers = event.getHeaders();
            headers.put(key_name,timeStamp+"");

        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
        return event;
    }


    /**
     *拦截逻辑所在的方法 : 针对一批数据
     * @param list
     * @return
     */
    @Override
    public List<Event> intercept(List<Event> list) {

        for (Event event : list) {
            intercept(event);
        }
        return list;
    }


    /**
     * agent要退出前，会调用一次
     * 可以用于做一些资源关闭、清理的操作
     */
    @Override
    public void close() {


    }

    public static class ExtractTimeStampInterceptorBuilder implements Interceptor.Builder{
        String key_name;
        @Override
        public Interceptor build() {
            return new ExtractTimeStampInterceptor(key_name);
        }

        @Override
        public void configure(Context context) {
            key_name = context.getString("key_name","timestamp");


        }
    }



}
